0%

Resillient Distributed Datasets Note

Background

本文为阅读Spark The Definitive Guide Chapter 12所做的归纳与整理

Definition

  • Immutable, Partitioned Collection Of Records
  • No Concept Of Rows in RDD, individual Records are just Java/Scala/Python Objects. There are no schema in RDDs.
  • All the code in spark compiles down to RDD
  • Spark’s Structured API automatically store data in an optimized, compressed binary format while you need to implement this format inside your objects manually

Difference between Datasets and RDDs of Case Classes

  • Dataset take advantage of the optimizer and format conversion
  • Dataset donnot need to serialize the whole object

When to Use the Low-Level APIs?

When you’re calling a DataFrame transformation, it actually just becomes a set of RDD transformations

  1. You need some functionality that you cannot find in the higher-level APIs; for example, if you need very tight control over physical data placement across the cluster
  2. You need to maintain some legacy codebase written using RDDs.
  3. You need to do some custom shared variable manipulation.

Types of RDDs

  • Generic RDD
  • Key-Value RDD

属性

  1. 计算位置 ptionally, a list of preferred locations on which to compute each split (e.g., block locations for a Hadoop Distributed File System [HDFS] file)
  2. Partitioner Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hashpartitioned)
  3. 互相依赖 A list of dependencies on other RDDs
  4. 分区计算 A function for computing each split
  5. 分区list A list of partitions

创建RDD

Interoperating Between Dataframes, Datasets and RDD

  1. Dataset[T]→RDD[T]
1
2
// in Scala: converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd
  1. Dataframe -> RDD[T]

To operate on this data, you will need to convert this Row object to the correct data type or extract values out of it.

1
2
// in Scala Dataframe -> RDD[Long]
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
  1. RDD -> Dataframe
1
2
// in Scala
spark.range(10).rdd.toDF()

From a Local Collection

1
2
3
4
// in Scala
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
words.setName("myWords")

From Data Sources

1.line by line

1
spark.sparkContext.textFile("/some/path/withTextFiles")

2.将整个file读取后作为一条记录

1
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")

Manipulating RDDs

所有对RDD的操作都是基于函数式编程的,并且与Dataframe不同的是,所有对RDD的操作都是基于原生的Java或者Scala对象

Transformations

filter

接受一个返回Boolean的函数(method)

1
2
3
4
// in Scala
words.filter(word => startsWithS(word)).collect()
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
words2.filter(record => record._3).take(5)

sort

  1. Sometimes, each current row should return multiple rows, instead
  2. words.sortBy(word => word.length() * -1).take(2)

distinct

1
words.distinct().count()

map

输入一行 apply相应的函数 输出一行

1
2
// in Scala
val words2 = words.map(word => (word, word(0), word.startsWith("S")))

flatMap

接受一个返回Iterable对象的函数

1
2
// in Scala
words.flatMap(word => word.toSeq).take(5)

RandomSplit

randomly split an RDD into an Array of RDDs

1
2
// in Scala
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))

Actions

Actions either collect data to the driver or write to an external data source

reduce

You can use the reduce method to specify a function to “reduce” an RDD of any kind of value to one value

1
2
// in Scala
spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
1
// in Scala spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
1
2
3
4
5
6
7
8
// in Scala
def wordLengthReducer(leftWord:String, rightWord:String): String =
{ if (leftWord.length > rightWord.length)
return leftWord
else
return rightWord
}
words.reduce(wordLengthReducer)

take

1
2
3
4
5
6
7
words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)

take and its derivative methods take a number of values from your RDD. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

count

1
words.count()
1
2
3
4
5
6
7
8
val confidence = 0.95 val timeoutMilliseconds = 400
//如果超出一定时间,返回近似值
words.countApprox(timeoutMilliseconds, confidence)
//传入relative accuracy
words.countApproxDistinct(0.05)
//将result set 放进driver
words.countByValue()
words.countByValueApprox(1000, 0.95)

first max and min

1
2
3
words.first()
spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()

max and min return the maximum values and minimum values, respectively:

  1. first The first method returns the first value in the dataset:

Saving Files

Saving files means writing to plain-text files. With RDDs, you cannot actually “save” to a data source in the conventional sense. You must iterate over the partitions in order to save the contents of each partition to some external database. This is a low-level approach that reveals the underlying operation that is being performed in the higher-level APIs.

saveAsTextFile

1
2
3
words.saveAsTextFile("file:/tmp/bookTitle")
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])

SequenceFiles

A sequenceFile is a flat file consisting of binary key–value pairs. It is extensively used in MapReduce as input/output formats.

1
words.saveAsObjectFile("/tmp/my/sequenceFilePath")

Hadoop Files

Hadoop支持的file格式,参见HadoopTheDefinitiveGuide

Caching

  • cache将结果存在memory中
  • persist可以指定StorageLevel
1
2
3
4
// in Scala
words.getStorageLevel
words.cache()
rdd1.persist(StorageLevel.MEMORY_AND_DISK)

setCheckPoint

check pointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source

1
2
spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")
words.checkpoint()

Pipe RDDs to System Commands

1
words.pipe("wc -l").collect()

mapPartition

Map an individual partition

该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。

下面这行代码的结果将是words的分区数量

1
2
3
4
5
6
7
8
9
10
11
12
// in Scala
words.mapPartitions(part => Iterator[Int](1)).sum()

val sc = new SparkContext(new SparkConf().setAppName("map_mapPartitions_demo").setMaster("local"))
val arrayRDD =sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
arrayRDD.mapPartitions(elements=>{
var result = new ArrayBuffer[Int]()
elements.foreach(element=>{
result.+=(element)
})
result.iterator
}).foreach(println)

mapPartitionsWithIndex

函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引

1
2
3
4
5
def indexedFunc(partitionIndex:Int, withinPartIterator: Iterator[String]) = {
withinPartIterator.toList.map(
value => s"Partition: $partitionIndex => $value").iterator
}
words.mapPartitionsWithIndex(indexedFunc).collect()

forEachPartition

1
2
3
4
5
6
7
8
9
10
words.foreachPartition { iter =>
import java.io._
import scala.util.Random
val randomFileName = new Random().nextInt()
val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
while (iter.hasNext) {
pw.write(iter.next())
}
pw.close()
}

Glom

glom is an interesting function that takes every partition in your dataset and converts them to arrays. This can be useful if you’re going to collect the data to the driver and want to have an array for each partition

1
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()